Apache Flink এ Flink SQL এবং Table API হল উচ্চ-স্তরের API যা স্ট্রিম এবং ব্যাচ ডেটা প্রসেসিংয়ের জন্য ব্যবহার করা হয়। এই API গুলো ডেটা স্ট্রিমকে টেবিল আকারে উপস্থাপন করে এবং SQL এবং ট্যাবুলার অপারেশন ব্যবহার করে সহজে ডেটা প্রসেস করতে সাহায্য করে। Flink SQL এবং Table API ব্যবহার করে ডেভেলপাররা রিয়েল-টাইম এবং ব্যাচ ডেটা এনালাইসিস করতে পারেন, যা SQL-এর সাধারণ সিনট্যাক্সে লেখা যায়।
Flink SQL একটি ডেটা প্রসেসিং API যা SQL ভাষা ব্যবহার করে স্ট্রিম এবং ট্যাবুলার ডেটা প্রসেস করতে দেয়। এটি SQL queries এর মাধ্যমে ডেটা ফিল্টারিং, জয়েনিং, এগ্রিগেটিং এবং ট্রান্সফর্মেশন করতে সহায়ক।
// Create a TableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Register a Kafka source table
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id STRING, " +
" product_id STRING, " +
" quantity INT, " +
" order_time TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'kafka', " +
" 'topic' = 'orders', " +
" 'properties.bootstrap.servers' = 'localhost:9092', " +
" 'format' = 'json'" +
")"
);
// Run an SQL query
Table result = tableEnv.sqlQuery(
"SELECT product_id, SUM(quantity) AS total_quantity " +
"FROM orders " +
"GROUP BY product_id"
);
// Convert the result table back to a DataStream
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(result, Row.class);
Flink Table API একটি উচ্চ-স্তরের API যা স্ট্রিম এবং ট্যাবুলার ডেটা প্রসেস করতে সাহায্য করে। এটি SQL-এর মত ঘোষণামূলক হলেও, এটি প্রোগ্রামিং ল্যাঙ্গুয়েজ (Java, Scala) এর সাথে আরও ইন্টিগ্রেটেড এবং টেকসই। Table API ব্যবহার করে ডেভেলপাররা ট্যাবুলার অপারেশনগুলো স্ট্রিম বা ডেটাসেটের উপর পারফর্ম করতে পারেন।
// Create a TableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create a DataStream
DataStream<Order> orderStream = env.fromElements(
new Order("order_1", "product_1", 5),
new Order("order_2", "product_2", 10)
);
// Convert DataStream to Table
Table orders = tableEnv.fromDataStream(orderStream);
// Perform transformation using Table API
Table result = orders
.groupBy($("productId"))
.select($("productId"), $("quantity").sum().as("totalQuantity"));
// Convert the result Table back to DataStream
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(result, Row.class);
বৈশিষ্ট্য | Flink SQL | Flink Table API |
---|---|---|
Language | SQL স্ট্যান্ডার্ড সিনট্যাক্স | প্রোগ্রামিং ল্যাঙ্গুয়েজ (Java, Scala) ভিত্তিক সিনট্যাক্স |
Flexibility | ডেভেলপারদের জন্য সহজ এবং পরিচিত | আরও প্রোগ্রাম্যাটিক এবং ফ্লেক্সিবল |
Use Case | রিয়েল-টাইম কুইরিং এবং রিপোর্টিং | ডায়নামিক এবং জটিল ডেটা ট্রান্সফর্মেশন |
Functionality | শুধুমাত্র SQL ফাংশন | বিল্ট-ইন এবং কাস্টম ইউজার-ডিফাইন্ড ফাংশন (UDF) সমর্থন করে |
Apache Flink এর Flink SQL এবং Table API ব্যবহার করে ডেভেলপাররা সহজেই বড় আকারের ডেটা প্রসেসিং করতে পারেন এবং রিয়েল-টাইম ডেটা এনালাইসিস ও ট্রান্সফর্মেশন করতে সক্ষম হন।
Flink SQL হলো Apache Flink-এর একটি ফিচার যা স্ট্রিম এবং ব্যাচ ডেটা প্রসেসিং করার জন্য SQL ভাষা ব্যবহার করতে দেয়। এটি Flink-এর ডেটা প্রসেসিং ক্ষমতাকে SQL-ভিত্তিক অ্যাপ্লিকেশনের সাথে একত্রিত করে, যাতে ডেভেলপাররা SQL লিখেই স্ট্রিম এবং ব্যাচ ডেটা বিশ্লেষণ করতে পারে। Flink SQL ব্যবহার করে আপনি ডেটা স্ট্রিমের উপর SQL query চালাতে পারেন, যা অনেকটা রিলেশনাল ডাটাবেসে SQL query চালানোর মতো।
Flink SQL, Apache Calcite এর উপর ভিত্তি করে কাজ করে, যা একটি SQL query parser এবং optimizer। Flink SQL মূলত স্ট্রিম প্রসেসিং API-এর উপর ভিত্তি করে একটি SQL abstraction লেয়ার প্রদান করে। এটি ডেভেলপারদের SQL ব্যবহার করে স্ট্রিম এবং ব্যাচ ডেটা প্রসেসিং করতে দেয়, যা সহজে একটি স্ট্রিম বা টেবিলের ডেটা ফিল্টার, অ্যাগ্রিগেট, এবং ট্রান্সফর্ম করতে সাহায্য করে।
Flink SQL-এর প্রধান উপাদান:
Flink SQL-এর বিভিন্ন ব্যবহার ক্ষেত্র রয়েছে, যা রিয়েল-টাইম ডেটা প্রসেসিং এবং বিশ্লেষণ থেকে শুরু করে ব্যাচ ডেটা প্রসেসিং পর্যন্ত বিস্তৃত। নিচে এর কিছু ব্যবহার ক্ষেত্রের উদাহরণ দেওয়া হলো:
Real-time Analytics:
ETL (Extract, Transform, Load) Jobs:
Complex Event Processing (CEP):
Batch Data Processing:
Data Warehousing এবং BI Integration:
নিচে Flink SQL-এর একটি সাধারণ উদাহরণ দেয়া হলো যেখানে Kafka থেকে স্ট্রিম ডেটা প্রসেস করা হচ্ছে:
-- Kafka টেবিল তৈরি করা
CREATE TABLE input_topic (
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'input-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
-- প্রসেস করা এবং ফলাফল আউটপুট টেবিলে রাখা
CREATE TABLE output_table (
user_id STRING,
event_count BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 'output/path',
'format' = 'csv'
);
-- SQL query দিয়ে ডেটা প্রসেস করা
INSERT INTO output_table
SELECT
user_id,
COUNT(event_type) AS event_count
FROM input_topic
GROUP BY user_id;
input_topic
নামে একটি টেবিল তৈরি করা হয়েছে, যা Kafka-র একটি টপিক থেকে ডেটা পড়বে।user_id
, event_type
, এবং event_time
, এবং এখানে watermark ব্যবহার করা হয়েছে event time tracking-এর জন্য।output_table
নামে একটি আউটপুট টেবিল তৈরি করা হয়েছে, যা ফাইল সিস্টেমে CSV ফাইল আউটপুট হিসাবে সংরক্ষণ করবে।INSERT INTO
query চালানো হয়েছে যা input_topic
থেকে ডেটা পড়ে user_id
এর উপর ভিত্তি করে ইভেন্ট কাউন্ট করে এবং output_table
এ সংরক্ষণ করে।Flink SQL স্ট্রিম এবং ব্যাচ ডেটা প্রসেসিং এর জন্য একটি শক্তিশালী এবং সহজ মাধ্যম। এটি real-time ডেটা বিশ্লেষণ, ETL, complex event processing এবং data warehousing-এর জন্য একটি কার্যকরী সলিউশন। Flink SQL ডেভেলপারদের SQL-এর সহজতা এবং Flink-এর শক্তিশালী ডেটা প্রসেসিং ক্ষমতাকে একত্রিত করে, যা অ্যাপ্লিকেশন ডেভেলপমেন্টকে আরও সহজ এবং কার্যকরী করে তোলে।
Apache Flink-এর Table API এবং SQL হলো উচ্চ-স্তরের APIs যা ডেটা প্রসেসিংকে সহজ এবং এক্সপ্রেসিভ করে তোলে। এগুলো ব্যবহার করে আমরা স্ট্রিম এবং ব্যাচ ডেটা খুব সহজেই প্রক্রিয়াকরণ করতে পারি, যেখানে Table API জাভা বা স্কালা API হিসেবে কাজ করে এবং SQL পরিচিত SQL সিনট্যাক্স ব্যবহার করে ডেটা প্রক্রিয়াকরণ করে।
Table API একটি রিচ, রিলেশনাল API যা ডেটা প্রসেসিং ও ট্রান্সফরমেশন করতে ট্যাবুলার ডেটা (টেবিল বা ভিউ) ব্যবহার করে। এটি স্ট্রিম এবং ব্যাচ উভয় ডেটার জন্য কাজ করে এবং এতে SQL-এর মতো অপারেশন যেমন select
, filter
, join
, groupBy
ইত্যাদি সাপোর্ট করে।
// Flink Execution Environment এবং Table Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// সোর্স ডেটা তৈরি করা (ডেমো ডেটাসেট হিসেবে)
DataStream<Tuple2<Integer, String>> dataStream = env.fromElements(
new Tuple2<>(1, "Alice"),
new Tuple2<>(2, "Bob"),
new Tuple2<>(3, "Charlie")
);
// DataStream থেকে টেবিল তৈরি করা
Table table = tableEnv.fromDataStream(dataStream, $("id"), $("name"));
// টেবিল থেকে ডেটা প্রসেসিং (ফিল্টার অপারেশন)
Table filteredTable = table.filter($("id").isGreater(1));
// প্রসেস করা টেবিলকে DataStream এ কনভার্ট করা
DataStream<Tuple2<Integer, String>> resultStream = tableEnv.toDataStream(filteredTable);
বর্ণনা: এখানে, একটি DataStream
থেকে একটি টেবিল তৈরি করা হয়েছে এবং তারপর একটি ফিল্টার অপারেশন প্রয়োগ করা হয়েছে যেখানে id
মান ১-এর বেশি। প্রসেস করার পর, এটি আবার DataStream
এ কনভার্ট করা হয়েছে।
SQL API Flink-এর ট্যাবুলার API-এর একটি অংশ যা স্ট্যান্ডার্ড SQL সিনট্যাক্স ব্যবহার করে স্ট্রিম এবং ব্যাচ ডেটা প্রসেস করতে দেয়। SQL API ব্যবহার করে ডেটা প্রসেসিং করার জন্য আপনাকে Table Environment তৈরি করতে হয় এবং টেবিল বা ভিউ রেজিস্টার করতে হয়।
// Flink Execution Environment এবং Table Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// সোর্স ডেটা তৈরি করা
DataStream<Tuple2<Integer, String>> dataStream = env.fromElements(
new Tuple2<>(1, "Alice"),
new Tuple2<>(2, "Bob"),
new Tuple2<>(3, "Charlie")
);
// টেবিল রেজিস্টার করা
tableEnv.createTemporaryView("people", tableEnv.fromDataStream(dataStream, $("id"), $("name")));
// SQL Query প্রয়োগ করা
Table result = tableEnv.sqlQuery("SELECT * FROM people WHERE id > 1");
// টেবিলকে DataStream এ কনভার্ট করা
DataStream<Tuple2<Integer, String>> resultStream = tableEnv.toDataStream(result);
বর্ণনা: এখানে, একটি টেবিল তৈরি করা হয়েছে এবং people
নামে একটি টেম্পোরারি ভিউ হিসেবে রেজিস্টার করা হয়েছে। তারপর একটি SQL SELECT
কোয়েরি প্রয়োগ করা হয়েছে যেখানে id
মান ১-এর বেশি।
Flink Table API এবং SQL API একত্রে ব্যবহার করা সম্ভব, যা ডেটা প্রসেসিং আরও ফ্লেক্সিবল করে তোলে। আপনি Table API এর মাধ্যমে টেবিল তৈরি ও প্রসেস করতে পারেন এবং SQL কোয়েরি ব্যবহার করে আরও জটিল অপারেশন করতে পারেন।
Table API এবং SQL API উভয়েই বিভিন্ন সোর্স ও সিংকের সাথে ইন্টিগ্রেট করা যায় যেমন Kafka, HBase, এবং RDBMS। উদাহরণস্বরূপ, আপনি Kafka থেকে ডেটা ইনজেস্ট করতে এবং প্রক্রিয়াকৃত ডেটা অন্য কোনো সিস্টেমে পাঠাতে পারেন।
// Kafka সোর্স এবং সিংক তৈরি করা
String kafkaDDL = "CREATE TABLE kafka_table (" +
" id INT," +
" name STRING" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'input_topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")";
tableEnv.executeSql(kafkaDDL);
// SQL কোয়েরি প্রয়োগ করা এবং আউটপুট রেজিস্টার করা
Table result = tableEnv.sqlQuery("SELECT id, name FROM kafka_table WHERE id > 1");
tableEnv.executeSql("CREATE TABLE output_table (" +
" id INT," +
" name STRING" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'output_topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")");
// টেবিলের ডেটা সিংকে লিখে দেওয়া
result.executeInsert("output_table");
বর্ণনা: এখানে, kafka_table
নামে একটি Kafka সোর্স রেজিস্টার করা হয়েছে এবং একটি SQL কোয়েরি প্রয়োগ করে প্রক্রিয়াকৃত ডেটা output_table
নামক Kafka সিংকে পাঠানো হয়েছে।
Apache Flink-এর Table API এবং SQL API ডেটা প্রসেসিংকে আরও সহজ এবং কার্যকর করে তোলে। এই API-গুলো স্ট্রিম এবং ব্যাচ ডেটার জন্য এক্সপ্রেসিভ, ফ্লেক্সিবল এবং পারফরম্যান্স-অপটিমাইজড সমাধান প্রদান করে। Flink-এর মাধ্যমে সহজেই বড় আকারের এবং জটিল ডেটা প্রসেসিং কাজগুলো সম্পন্ন করা সম্ভব।
Apache Flink এ Streaming SQL এবং Batch SQL ব্যবহার করে ডেটা স্ট্রিম এবং ব্যাচ ডেটাসেট উভয়ই প্রসেস করা যায়। Flink SQL এর মাধ্যমে ডেভেলপাররা রিয়েল-টাইম ডেটা স্ট্রিম এবং ঐতিহ্যবাহী ব্যাচ ডেটা প্রসেসিং-এর উপর SQL কিউরি চালাতে পারেন। Flink এর SQL API স্ট্রিম এবং ব্যাচ উভয় প্রক্রিয়ার জন্য একটি ইউনিফাইড ইন্টারফেস প্রদান করে, যা ডেটা প্রসেসিং সহজ এবং শক্তিশালী করে তোলে।
Flink এ Streaming SQL হল রিয়েল-টাইম স্ট্রিম ডেটা প্রসেসিং করার জন্য একটি শক্তিশালী টুল। এটি স্ট্রিম ডেটাকে Table হিসেবে উপস্থাপন করে এবং SQL কিউরির মাধ্যমে ডেটার উপর বিভিন্ন ট্রান্সফর্মেশন ও এনালাইসিস করা যায়।
StreamTableEnvironment
তৈরি করতে হবে।// 1. Create StreamTableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. Register a Kafka Source Table
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id STRING, " +
" product_id STRING, " +
" quantity INT, " +
" order_time TIMESTAMP(3), " +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka', " +
" 'topic' = 'orders', " +
" 'properties.bootstrap.servers' = 'localhost:9092', " +
" 'format' = 'json'" +
")"
);
// 3. Execute a Streaming SQL Query
Table result = tableEnv.sqlQuery(
"SELECT product_id, SUM(quantity) AS total_quantity " +
"FROM orders " +
"GROUP BY product_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
);
// 4. Register a Sink Table and write results
tableEnv.executeSql(
"CREATE TABLE result_sink (" +
" product_id STRING, " +
" total_quantity BIGINT" +
") WITH (" +
" 'connector' = 'print'" +
")"
);
result.executeInsert("result_sink");
এই উদাহরণে:
orders
নামে একটি সোর্স টেবিল রেজিস্টার করা হয়েছে।product_id
এর মোট quantity
গণনা করা হচ্ছে।result_sink
টেবিলে লেখা হচ্ছে।Batch SQL Flink এ ঐতিহ্যবাহী ব্যাচ ডেটা প্রসেসিং করার জন্য ব্যবহৃত হয়। Flink ব্যাচ ডেটাসেটের উপর SQL কিউরি চালাতে পারে এবং প্রয়োজনীয় ট্রান্সফর্মেশন করতে পারে। Flink এর SQL API ব্যাচ প্রসেসিংয়ের জন্যও একই ইন্টারফেস ব্যবহার করে, যা ইউনিফাইড ডেটা প্রসেসিং সিস্টেম তৈরি করে।
TableEnvironment
তৈরি করতে হবে।// 1. Create TableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 2. Register a File Source Table
tableEnv.executeSql(
"CREATE TABLE sales (" +
" sale_id STRING, " +
" product_id STRING, " +
" quantity INT, " +
" sale_date DATE" +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///path/to/sales.csv', " +
" 'format' = 'csv'" +
")"
);
// 3. Execute a Batch SQL Query
Table result = tableEnv.sqlQuery(
"SELECT product_id, SUM(quantity) AS total_quantity " +
"FROM sales " +
"GROUP BY product_id"
);
// 4. Register a Sink Table and write results
tableEnv.executeSql(
"CREATE TABLE result_sink (" +
" product_id STRING, " +
" total_quantity BIGINT" +
") WITH (" +
" 'connector' = 'print'" +
")"
);
result.executeInsert("result_sink");
এই উদাহরণে:
sales
নামে একটি ফাইল সোর্স টেবিল রেজিস্টার করা হয়েছে যা একটি CSV ফাইল থেকে ডেটা পড়ছে।product_id
এর মোট quantity
গণনা করা হয়েছে।result_sink
টেবিলে লেখা হচ্ছে।বৈশিষ্ট্য | Streaming SQL | Batch SQL |
---|---|---|
ডেটা প্রসেসিং | ক্রমাগত এবং রিয়েল-টাইম ডেটা প্রসেসিং | ঐতিহ্যবাহী ব্যাচ ডেটা প্রসেসিং |
SQL ফাংশন | উইন্ডো ফাংশন, অ্যাগ্রিগেশন, টাম্বলিং উইন্ডো, সেশন উইন্ডো | স্ট্যান্ডার্ড SQL ফাংশন এবং অ্যাগ্রিগেশন |
টেবিল টাইপ | স্ট্রিম টেবিল (অবিরত পরিবর্তিত হয়) | স্থায়ী টেবিল (একবার লোড হয়ে স্থির থাকে) |
ইউস কেস | রিয়েল-টাইম অ্যানালাইসিস, ইভেন্ট প্রসেসিং | ঐতিহ্যবাহী ব্যাচ ডেটা এনালাইসিস, রিপোর্টিং |
Flink এ Streaming এবং Batch SQL এর মাধ্যমে ডেভেলপাররা সহজে ডেটা প্রসেসিং করতে পারেন এবং ডেটার উপর দ্রুত ও কার্যকরীভাবে কিউরি চালাতে পারেন। এটি বড় আকারের ডেটা প্রসেসিং এবং রিয়েল-টাইম ডেটা এনালাইসিসের জন্য একটি শক্তিশালী টুল।
Flink SQL ব্যবহার করে স্ট্রিম বা ব্যাচ ডেটা প্রসেসিং-এর জন্য SQL Queries লেখা যায়, যেগুলো ডেটা ফিল্টার, গ্রুপ, অ্যাগ্রিগেট, এবং ট্রান্সফরম করতে পারে। নিচে কিছু সাধারণ উদাহরণসহ Flink SQL queries দেওয়া হলো:
এই কুয়েরি দিয়ে আপনি নির্দিষ্ট কলাম নির্বাচন করতে পারেন এবং শর্ত অনুযায়ী ফিল্টার করতে পারেন। উদাহরণস্বরূপ, আমরা একটি টেবিল থেকে নির্দিষ্ট টাইপের ইভেন্ট ফিল্টার করতে পারি।
SELECT user_id, event_type, event_time
FROM events
WHERE event_type = 'login';
user_id
, event_type
, এবং event_time
কলামগুলো নির্বাচন করা হয়েছে।event_type
হলো 'login'
।কোনো টেবিলের ডেটাকে গ্রুপ করে অ্যাগ্রিগেশন করা যেতে পারে। নিচের উদাহরণে, আমরা প্রতি user_id
ভিত্তিতে ইভেন্টের সংখ্যা গণনা করছি।
SELECT user_id, COUNT(*) AS event_count
FROM events
GROUP BY user_id;
events
টেবিল থেকে প্রতি user_id
অনুযায়ী ইভেন্ট গণনা করা হচ্ছে।COUNT(*)
পুরো টেবিলের রেকর্ড সংখ্যা গণনা করে এবং প্রতিটি user_id
এর জন্য এটি ফেরত দেয়।Flink SQL-এ উইন্ডো অপারেশন খুবই গুরুত্বপূর্ণ, বিশেষ করে স্ট্রিম প্রসেসিং-এর জন্য। নিচের উদাহরণে, ৫ মিনিটের টাম্বলিং উইন্ডোতে প্রতিটি event_type
এর সংখ্যা গণনা করা হচ্ছে।
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
event_type,
COUNT(*) AS event_count
FROM events
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTE),
event_type;
TUMBLE(event_time, INTERVAL '5' MINUTE)
একটি ৫ মিনিটের টাম্বলিং উইন্ডো তৈরি করে।TUMBLE_START
এবং TUMBLE_END
উইন্ডোর শুরু এবং শেষ সময় ফেরত দেয়।event_type
অনুযায়ী প্রতিটি উইন্ডোতে ইভেন্টের সংখ্যা গণনা করা হয়েছে।Flink SQL-এ HAVING
clause ব্যবহার করে, আপনি গ্রুপ করা ডেটাতে শর্ত প্রয়োগ করতে পারেন। নিচে একটি উদাহরণ দেয়া হলো, যেখানে প্রতি user_id
এর জন্য ইভেন্টের সংখ্যা ১০ এর বেশি হলে সেই রেকর্ডগুলো ফেরত দেয়া হয়েছে।
SELECT user_id, COUNT(*) AS event_count
FROM events
GROUP BY user_id
HAVING COUNT(*) > 10;
GROUP BY
ব্যবহার করে প্রতিটি user_id
অনুযায়ী ইভেন্ট গুনে বের করা হয়েছে।HAVING COUNT(*) > 10
এর মাধ্যমে শুধু সেই user_id
ফেরত দেয়া হচ্ছে যাদের ইভেন্ট সংখ্যা ১০ এর বেশি।Sliding উইন্ডো দিয়ে নির্দিষ্ট সময়ের জন্য উইন্ডো তৈরি করা যায় যা নির্দিষ্ট সময় পর পর স্লাইড করে। নিচে একটি উদাহরণ দেয়া হলো, যেখানে ১০ মিনিটের উইন্ডো এবং ৫ মিনিটের স্লাইড ব্যবহার করা হয়েছে।
SELECT
HOP_START(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS window_start,
HOP_END(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS window_end,
event_type,
COUNT(*) AS event_count
FROM events
GROUP BY
HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
event_type;
HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)
একটি ১০ মিনিটের উইন্ডো তৈরি করে, যা প্রতি ৫ মিনিট পর পর স্লাইড করে।HOP_START
এবং HOP_END
উইন্ডোর শুরুর এবং শেষের সময় দেখায়।Flink SQL-এ আপনি বিভিন্ন টেবিলের মধ্যে JOIN
ব্যবহার করে ডেটা মিলিয়ে দেখতে পারেন। নিচে একটি উদাহরণ দেয়া হলো, যেখানে orders
এবং customers
টেবিল যোগ করা হয়েছে।
SELECT o.order_id, o.order_date, c.customer_name
FROM orders o
JOIN customers c
ON o.customer_id = c.customer_id;
orders
এবং customers
টেবিলের মধ্যে customer_id
কলামের উপর ভিত্তি করে যোগ করা হয়েছে।Flink SQL-এ WATERMARK
ব্যবহার করে ইভেন্ট টাইমের উপর ভিত্তি করে প্রসেসিং করা যায়, যা লেট ইভেন্টগুলো হ্যান্ডেল করতে সাহায্য করে। নিচে একটি টেবিলের উদাহরণ দেয়া হলো যেখানে event_time
এর উপর ভিত্তি করে watermark তৈরি করা হয়েছে।
CREATE TABLE events (
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'event-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
Kafka
থেকে ডেটা পড়ে এবং event_time
কলামের উপর ভিত্তি করে watermark তৈরি করে।WATERMARK FOR event_time AS event_time - INTERVAL '2' SECOND
এর মাধ্যমে ইভেন্ট টাইম থেকে ২ সেকেন্ড পিছিয়ে watermark সেট করা হয়েছে।Flink SQL দিয়ে আপনি বিভিন্ন ধরনের query চালাতে পারেন, যা স্ট্রিম এবং ব্যাচ ডেটা প্রসেসিং-এর জন্য খুবই কার্যকর। এর মাধ্যমে ডেটা ফিল্টার, গ্রুপ, অ্যাগ্রিগেট এবং উইন্ডো প্রসেসিং সহজে করা যায়। Flink SQL-এ ডেটাবেজের মতো টেবিল তৈরি করা, সেগুলোর মধ্যে সম্পর্ক তৈরি করা এবং কাস্টম উইন্ডো এবং ইভেন্ট টাইম প্রসেসিং করার মাধ্যমে ডেটা বিশ্লেষণকে সহজ এবং কার্যকর করা যায়।
Read more